Skip to content

BREAKING: Converts to Swift Concurrency #166

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: main
Choose a base branch
from

Conversation

NeedleInAJayStack
Copy link
Member

This removes the NIO dependency. It is breaking because it removes all Swift NIO-isms that were present in the public APIs (like EventLoopFuture and EventLoopGroup argument/return types).

paulofaria
paulofaria previously approved these changes Jun 24, 2025
.package(url: "https://github.com/apple/swift-collections", .upToNextMajor(from: "1.0.0")),
],
targets: [
.target(
name: "GraphQL",
dependencies: [
.product(name: "NIO", package: "swift-nio"),
.product(name: "OrderedCollections", package: "swift-collections"),
]
),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe enable strict concurrency as well

    swiftLanguageVersions: [.v5, .version("6")]

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@NeedleInAJayStack have you tried enabling strict concurrency?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Has a quick look at this. The fact that you throw Any around a lot is going to make it very difficult to enable strict concurrency

let fieldASTs = field.value
let fieldPath = path.appending(field.key)
// TODO: Probably need to lock around results access here.
results[field.key] = try await resolveField(
Copy link
Contributor

@adam-fowler adam-fowler Jun 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah this is going to cause issues. Best way to do this is have the child task return an index and result and fill out the ordered dictionary when parsing the results of the group.

group.addTask {
    ...
    return (key: field.key, result: try await resolveField(...))
}
for try await result in group {
    results[result.key] = result.result
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah perfect, thanks for the insight on how best to do this. I've adjusted to this approach.


completedResults.append(completedItem)
// TODO: Probably need to block around completedResults access here
completedResults[index] = completedItem
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See comment above, about returning index with result to group

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I fixed this also

@@ -3,7 +3,7 @@ open class EventStream<Element> {
public init() {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated to concurrency, but should this not be a protocol

@@ -30,13 +30,13 @@ public class ConcurrentEventStream<Element>: EventStream<Element> {

@available(macOS 10.15, iOS 15, watchOS 8, tvOS 15, *)
extension AsyncThrowingStream {
func mapStream<To>(_ closure: @escaping (Element) throws -> To)
func mapStream<To>(_ closure: @escaping (Element) async throws -> To)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason you have this when AsyncSequence.map exists?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unstructured Tasks aren't great. They make cancellation hard.
In actual fact you could just replace the whole of this file with AsyncStream<Element>. This opens up all the functions that AsyncSequence has eg map, filter, compactMap, first, etc and cancellation will come for free.

Copy link
Member Author

@NeedleInAJayStack NeedleInAJayStack Jun 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We had created this a few years ago before Swift Concurrency existed to abstract different AsyncSequence/PubSub backends (at the time, options included RxSwift and Combine). Since we're switching over to concurrency, I agree, we should switch to having AsyncThrowingStream as a first class member in our API. I've done that work here: 9ec313a

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess @adam-fowler’s point is to use some async sequence in the public APIs instead of the concrete stream, is that correct @adam-fowler ?

This removes the NIO dependency. It is breaking because it removes all Swift NIO-isms that were present in the public APIs (like EventLoopFuture and EventLoopGroup argument/return types).
@NeedleInAJayStack NeedleInAJayStack changed the title Draft: feat!: Uses swift concurrency under the hood BREAKING: Converts to Swift Concurrency Jun 25, 2025
The intent is to replace it with swift-distributed-tracing integration.
This resolves the race condition caused by the inbox counts and the event delivery. If event delivery happens before the subsequent publish increments the inbox counts, then the counts will be lower than expected. Resolved by just not asking for inbox counts, since they aren't relevant to the test.
This was causing test hangs on macOS
@NeedleInAJayStack
Copy link
Member Author

Hey @adam-fowler & @paulofaria - I managed to get the tests passing, and I think this is ready for you guys to look again. Could you give it a pass on my changes following your comments?

.package(url: "https://github.com/apple/swift-collections", .upToNextMajor(from: "1.0.0")),
],
targets: [
.target(
name: "GraphQL",
dependencies: [
.product(name: "NIO", package: "swift-nio"),
.product(name: "OrderedCollections", package: "swift-collections"),
]
),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@NeedleInAJayStack have you tried enabling strict concurrency?

Comment on lines +47 to +49
// We must create a new AsyncSequence because AsyncSequence.map requires a concrete type
// (which we cannot know),
// and we need the result to be a concrete type.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't GraphQLResult a concrete type, though? What am I missing?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right - GraphQLResult is a concrete type, but the concrete signature of the Stream or Sequence can get really complex as you manipulate it. For example, if I take an AsyncSequence<String> and then call .filter and then .map to get a Bool, then the resulting type will be AsyncMapSequence<AsyncFilterSequence<AsyncSequence<String>>, Bool>. This type conforms to AsyncSequence with Element == Bool, but it's true signature is pretty complex.

The underlying issue here is that trying to call (any AsyncSequence).map will give the compilation error: Member 'map' cannot be used on value of type 'any AsyncSequence'; consider using a generic constraint instead. But using a generic type constraint means that we can't accept any AsyncSequence as our resolver type, which dramatically limits the resolver definition API (for reasons above).

Instead, (any AsyncSequence) does allow a user to iterate it, with each result typed as Any, which we can then bind into a new sequence.

Sorry - I know this is kinda nuanced. Let me know if you have more questions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can the sourceStream element be anything?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could use a type erased AsyncSequence to do the mapping (to avoid the additional task. This is a AnyAsyncSequence I use in hummingbird (code was originally stolen from AsyncHTTPClient)

@usableFromInline
struct AnyAsyncSequence<Element>: AsyncSequence {
    @usableFromInline
    typealias AsyncIteratorNextCallback = () async throws -> Element?

    @usableFromInline
    let makeAsyncIteratorCallback: @Sendable () -> AsyncIteratorNextCallback

    @inlinable
    init<AS: AsyncSequence>(_ base: AS) where AS.Element == Element, AS: Sendable, AS.AsyncIterator: _HB_SendableMetatype {
        self.makeAsyncIteratorCallback = {
            var iterator = base.makeAsyncIterator()
            return {
                try await iterator.next()
            }
        }
    }

    @usableFromInline
    struct AsyncIterator: AsyncIteratorProtocol {
        @usableFromInline
        let nextCallback: AsyncIteratorNextCallback

        @usableFromInline
        init(nextCallback: @escaping AsyncIteratorNextCallback) {
            self.nextCallback = nextCallback
        }

        @inlinable
        func next() async throws -> Element? {
            try await self.nextCallback()
        }
    }

    @inlinable
    func makeAsyncIterator() -> AsyncIterator {
        .init(nextCallback: self.makeAsyncIteratorCallback())
    }
}

extension AsyncSequence where Self: Sendable {
    var any: AnyAsyncSequence<Element> { .init(self) }
}

You could use something like this that also includes a mapping closure

type: GraphQLString,
resolve: { eventResult, _, _, _, _ in // Defines how to transform each event when it occurs
return eventResult
},
subscribe: { _, _, _, _, _ in // Defines how to construct the event stream
let asyncStream = AsyncThrowingStream<String, Error> { continuation in
return AsyncThrowingStream<String, Error> { continuation in
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this mean that we only support AsyncThrowingStream streams or do we also support any AsyncThrowingSequence?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great question! No, this just happens to be the example - within the subscription resolver, we support any AsyncSequence (see here for the subscription type), but the return type of a subscribe() call will always return an AsyncThrowingStream<GraphQLResult, Error>.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool! I spent some time looking for that and couldn’t find it, haha. That was my main concern. Please update the README example making that point clear. Other than that, I’m happy with the PR. 🙂

Actually.. I have a follow up question, haha. Why AsyncSequence and not AsyncThrowingSequence?

Comment on lines 49 to 84
// We must create a new AsyncSequence because AsyncSequence.map requires a concrete type (which we cannot know),
// and we need the result to be a concrete type.
let subscriptionStream = AsyncThrowingStream<GraphQLResult, Error> { continuation in
let task = Task {
do {
for try await eventPayload in sourceStream {
// For each payload yielded from a subscription, map it over the normal
// GraphQL `execute` function, with `payload` as the rootValue.
// This implements the "MapSourceToResponseEvent" algorithm described in
// the GraphQL specification. The `execute` function provides the
// "ExecuteSubscriptionEvent" algorithm, as it is nearly identical to the
// "ExecuteQuery" algorithm, for which `execute` is also used.
let newEvent = try await execute(
queryStrategy: queryStrategy,
mutationStrategy: mutationStrategy,
subscriptionStrategy: subscriptionStrategy,
instrumentation: instrumentation,
schema: schema,
documentAST: documentAST,
rootValue: eventPayload,
context: context,
variableValues: variableValues,
operationName: operationName
)
continuation.yield(newEvent)
}
continuation.finish()
} catch {
continuation.finish(throwing: error)
}
}

continuation.onTermination = { @Sendable reason in
task.cancel()
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@adam-fowler I still had to wrap this in an unstructured Task because AsyncSequence.map required a concrete type, whereas for try await seemed to work on any AsyncSequence. Also, I was running into MacOS v15 requirements when trying to qualify the AsyncSequence Failure type. That said, I agree with your points on unstructured tasks and it feels kinda gross.

Do you have any suggestions?

type: GraphQLString,
resolve: { eventResult, _, _, _, _ in // Defines how to transform each event when it occurs
return eventResult
},
subscribe: { _, _, _, _, _ in // Defines how to construct the event stream
let asyncStream = AsyncThrowingStream<String, Error> { continuation in
return AsyncThrowingStream<String, Error> { continuation in
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great question! No, this just happens to be the example - within the subscription resolver, we support any AsyncSequence (see here for the subscription type), but the return type of a subscribe() call will always return an AsyncThrowingStream<GraphQLResult, Error>.

Comment on lines +47 to +49
// We must create a new AsyncSequence because AsyncSequence.map requires a concrete type
// (which we cannot know),
// and we need the result to be a concrete type.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right - GraphQLResult is a concrete type, but the concrete signature of the Stream or Sequence can get really complex as you manipulate it. For example, if I take an AsyncSequence<String> and then call .filter and then .map to get a Bool, then the resulting type will be AsyncMapSequence<AsyncFilterSequence<AsyncSequence<String>>, Bool>. This type conforms to AsyncSequence with Element == Bool, but it's true signature is pretty complex.

The underlying issue here is that trying to call (any AsyncSequence).map will give the compilation error: Member 'map' cannot be used on value of type 'any AsyncSequence'; consider using a generic constraint instead. But using a generic type constraint means that we can't accept any AsyncSequence as our resolver type, which dramatically limits the resolver definition API (for reasons above).

Instead, (any AsyncSequence) does allow a user to iterate it, with each result typed as Any, which we can then bind into a new sequence.

Sorry - I know this is kinda nuanced. Let me know if you have more questions.

@@ -280,10 +262,10 @@ func executeSubscription(
// checking. Normal resolvers for subscription fields should handle type casting, same as resolvers
// for query fields.
struct SourceEventStreamResult {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason you don't use Result here, with an error type that can take multiple GraphQLErrors? Can you have a stream and errors at the same time?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants